<?php
namespace app\api\services;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
use PhpAmqpLib\Message\AMQPMessage;
use think\facade\Config;
use think\facade\Db;

class RabbitMqPublishService
{
    private static $instance;
    private $connection;
    private $channel;

    private function __construct()
    {
        $this->connect();
    }

    public static function getInstance(): self
    {
        if (null === self::$instance) {
            self::$instance = new self();
        }
        return self::$instance;
    }

    private function connect()
    {
        $config = Config::get('rabbitmq.hosts');
        try {
            $this->connection = new AMQPStreamConnection(
                $config['host'],
                $config['port'],
                $config['username'],
                $config['password'],
                $config['vhost']
            );
            $this->channel = $this->connection->channel();
        } catch (AMQPConnectionClosedException $e) {
            $this->channel = null;
            // 可以记录日志或抛出异常
            // 此处根据业务需求处理连接失败的情况
        }
    }

    public function publish($conf, $data, $insertTest = true)
    {
        if ( ! $this->channel) {
            $this->connect();
        }

        try {
            $this->channel->queue_declare(
                $conf['queue_name'],
                false,
                true,
                false,
                false,
                false,
                $conf['queue_args'] ?? []
            );

            $this->channel->exchange_declare(
                $conf['exchange_name'],
                $conf['exchange_type'] ?? 'direct',
                false,
                true,
                false,
                false,
                false,
                $conf['exchange_args'] ?? []
            );

            $this->channel->queue_bind(
                $conf['queue_name'],
                $conf['exchange_name'],
                $conf['route_key']
            );

            $messageBody = json_encode($data, JSON_UNESCAPED_UNICODE);
            // var_dump($messageBody);exit;
            $message = new AMQPMessage($messageBody, [
                'content_type'  => 'application/json',
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
            ]);

            $this->channel->basic_publish(
                $message,
                $conf['exchange_name'],
                $conf['route_key']
            );

            if ($insertTest) {
                Db::name('test')->insert([
                    'param' => "发送列队实体内容：" . $messageBody . "\n 路由信息：" . json_encode($conf, JSON_UNESCAPED_UNICODE),
                ]);
            }

        } catch (AMQPConnectionClosedException $e) {
            $this->channel = null;
            // 重试逻辑或记录日志
            $this->connect(); // 重新建立连接
            return $this->send($event, $data, $insertTest); // 递归重试发送
        }

        return true;
    }

    public function stop()
    {
        if ($this->channel) {
            $this->channel->close();
        }
        if ($this->connection) {
            $this->connection->close();
        }
    }
}

// 使用示例
// $rabbitMqService = RabbitMqPublishService::getInstance();
// $result = $rabbitMqService->send($event, $data);
// if ($result) {
//     // 消息发送成功
// } else {
//     // 消息发送失败
// }
// $rabbitMqService->stop();
